package com.gitee.aleenjava.aliyun.mqtt.callback;

import com.gitee.aleenjava.aliyun.mqtt.config.MqttSubscriberProperties;
import com.gitee.aleenjava.aliyun.mqtt.model.ApplicationContextHelper;
import com.gitee.aleenjava.aliyun.mqtt.model.SubscriberInfo;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author cheng
 */
@Slf4j
public class MessageMqttCallback implements MqttCallback {

    private final MqttSubscriberProperties mqttSubscriberProperties;
    private MqttClient mqttClient;
    private final List<SubscriberInfo> subscriberInfoList;
    private final MqttConnectOptions options;
    private final String domainUrl;
    private final  Map<String, SubscriberInfo> mappedSubscriberInfo;


    public MessageMqttCallback(String domainUrl, MqttConnectOptions options, MqttClient mqttClient,
                               MqttSubscriberProperties mqttSubscriberProperties,
                               List<SubscriberInfo> subscriberInfoLis) {
        this.domainUrl = domainUrl;
        this.options = options;
        this.mqttClient = mqttClient;
        this.subscriberInfoList = subscriberInfoLis;
        this.mqttSubscriberProperties = mqttSubscriberProperties;
        this.mappedSubscriberInfo = subscriberInfoList.stream().collect(Collectors.toMap(SubscriberInfo::getTopic, (p) -> p));
    }


    @Override
    public void connectionLost(Throwable throwable) {
        String ip = mqttSubscriberProperties.getIp();
        String port = mqttSubscriberProperties.getPort();
        int maxReconnectCount = mqttSubscriberProperties.getClientReconnectCount();
        long reconnectIntervalTime = mqttSubscriberProperties.getClientReconnectIntervalTime();
        log.error("mqtt lost connection message:{}", throwable.getMessage());
        if (!(throwable instanceof MqttException)) {
            return;
        }
        MqttException mqttException = (MqttException) throwable;
        if (mqttException.getReasonCode() != MqttException.REASON_CODE_CONNECTION_LOST) {
            return;
        }
        for (int i = 1; i <= maxReconnectCount; i++) {
            try {
                log.info("mqtt subscribe client {}/{} times try reconnection, connection info ip:{}, port:{}", i, maxReconnectCount, ip, port);
                reConnect(subscriberInfoList);
                log.info("mqtt subscribe client {} times reconnection successful, connection info ip:{}, port:{}", i, ip, port);
                TimeUnit.MILLISECONDS.sleep(reconnectIntervalTime);
                return;
            } catch (Exception e) {
                log.error("mqtt subscribe client {}/{} times reconnection failed, connection ip:{}, port:{}, message:{}", i, maxReconnectCount, ip, port, e.getMessage(), e);
            }
        }
        log.error("mqtt subscribe client reconnection ,reconnection times reached {}times, connection ip:{}, port:{}", maxReconnectCount, ip, port);

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        try {
           // Map<String, SubscriberInfo> mappedSubscriberInfo = subscriberInfoList.stream().collect(Collectors.toMap(SubscriberInfo::getTopic, (p) -> p));
            mappedSubscriberInfo.get(topic).getMqttSubscribeProcessor().process(topic, message);
        } catch (Exception e) {
            log.error("mqtt消费异常, message:{}", e.getMessage(), e);
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("deliveryComplete topic {}", token.getTopics()[0]);
    }

    /**
     * 断线重连
     *
     * @param subscriberInfoList
     * @throws MqttException
     */
    private void reConnect(List<SubscriberInfo> subscriberInfoList) throws MqttException {
        mqttClient.close(true);
        mqttClient = new MqttClient(domainUrl, mqttSubscriberProperties.getClientId(), new MemoryPersistence());
        mqttClient.setCallback(this);
        mqttClient.connect(options);
        for (SubscriberInfo subscriberInfo : subscriberInfoList) {
            String topic = subscriberInfo.getTopic();
            int qos = subscriberInfo.getQos().getQos();
            mqttClient.subscribe(topic, qos);
        }
    }

}
